package com.mimo.logic.message.grpc;

import java.util.Collection;
import java.util.stream.Collectors;

import org.springframework.beans.factory.annotation.Autowired;

import com.google.protobuf.Any;
import com.google.protobuf.StringValue;
import com.mimo.common.logic.code.StatusCode;
import com.mimo.common.logic.dto.msg.BaseDispatchMessage;
import com.mimo.common.rpc.proto.BaseResponseProto.BaseResponse;
import com.mimo.common.rpc.proto.BaseResponseProto.Plurality;
import com.mimo.common.rpc.proto.DispatchMessageProto.DispatchMessage;
import com.mimo.common.rpc.proto.LogicMessageProxyGrpc;
import com.mimo.common.rpc.proto.LogicMsgRpc.AckMessage;
import com.mimo.logic.message.service.IMessageDispatcher;

import io.grpc.stub.StreamObserver;
import net.devh.boot.grpc.server.service.GrpcService;

@GrpcService
public class LogicMessageGrpc extends LogicMessageProxyGrpc.LogicMessageProxyImplBase {

  @Autowired
  private IMessageDispatcher messageDispatcher;

  @Override
  public void getPendingMessage(StringValue request, StreamObserver<BaseResponse> responseObserver) {
    Collection<BaseDispatchMessage> msgs = messageDispatcher.loadUnacksByUserId(request.getValue());

    BaseResponse resp = BaseResponse.newBuilder()
        .setPluralityValue(Plurality.newBuilder()
            .addAllItems(msgs.stream().map(BaseDispatchMessage::convert).map(Any::pack).collect(Collectors.toList())))
        .build();

    responseObserver.onNext(resp);
    responseObserver.onCompleted();
  }

  /**
   * 上下文要返回对应的异常给grpc的消费者
   */
  @Override
  public void dispatch(DispatchMessage request, StreamObserver<BaseResponse> responseObserver) {
    StatusCode statusCode = messageDispatcher.dispatch(BaseDispatchMessage.convert(request));
    responseObserver.onNext(statusCode.toResp());
    responseObserver.onCompleted();
  }

  @Override
  public void ack(AckMessage request, StreamObserver<BaseResponse> responseObserver) {
    messageDispatcher.ack(request.getUserId(), request.getMsgId());
    responseObserver.onNext(StatusCode.getSuccessInstance());
    responseObserver.onCompleted();
  }

}
